CONUS404 Temporal Aggregation#

Create daily averages from hourly data, write to a zarr dataset

import fsspec
import xarray as xr
import hvplot.xarray
import intake
import os
import warnings
from dask.distributed import LocalCluster, Client
warnings.filterwarnings('ignore')

Open dataset from Intake Catalog#

  • Select on-prem dataset from /caldera if running on prem (Denali/Tallgrass)

  • Select cloud/osn data on S3 if running elsewhere.

**NOTE This notebook reads data from OSN by default, which is free to access from any environment. If you change this notebook to use one of the CONUS404 datasets stored on S3 (options ending in -cloud), you will be pulling data from a requester pays bucket. This means you have to set up your AWS credentials, else we won’t be able to load the data. Please note that reading the -cloud data from S3 may incur charges if you are reading data outside of the us-west-2 region or running the notebook outside of the cloud altogether. If you would like to access one of the -cloud options, add a code cell and run the following code snippet to set up your AWS credentials:

os.environ['AWS_PROFILE'] = 'default'
%run ../environment_set_up/Help_AWS_Credentials.ipynb
# open the hytest data intake catalog
hytest_cat = intake.open_catalog("https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/hytest_intake_catalog.yml")
list(hytest_cat)
['conus404-catalog',
 'conus404-drb-eval-tutorial-catalog',
 'nhm-v1.0-daymet-catalog',
 'nhm-v1.1-c404-bc-catalog',
 'nhm-v1.1-gridmet-catalog',
 'nwis-streamflow-usgs-gages-onprem',
 'nwis-streamflow-usgs-gages-cloud',
 'nwm21-streamflow-usgs-gages-onprem',
 'nwm21-streamflow-usgs-gages-cloud',
 'nwm21-streamflow-cloud',
 'nwm21-scores',
 'lcmap-cloud',
 'rechunking-tutorial-cloud']
# open the conus404 sub-catalog
cat = hytest_cat['conus404-catalog']
list(cat)
['conus404-hourly-onprem',
 'conus404-hourly-cloud',
 'conus404-hourly-osn',
 'conus404-daily-diagnostic-onprem',
 'conus404-daily-diagnostic-cloud',
 'conus404-daily-diagnostic-osn',
 'conus404-daily-onprem',
 'conus404-daily-cloud',
 'conus404-daily-osn',
 'conus404-monthly-onprem',
 'conus404-monthly-cloud',
 'conus404-monthly-osn']
## Select the dataset you want to read into your notebook and preview its metadata
dataset = 'conus404-hourly-osn' 
cat[dataset]
conus404-hourly-osn:
  args:
    consolidated: true
    storage_options:
      anon: true
      client_kwargs:
        endpoint_url: https://renc.osn.xsede.org
      requester_pays: false
    urlpath: s3://rsignellbucket2/hytest/conus404/conus404_hourly_202302.zarr
  description: 'CONUS404 Hydro Variable subset, 40 years of hourly values. These files
    were created wrfout model output files (see ScienceBase data release for more
    details: https://www.sciencebase.gov/catalog/item/6372cd09d34ed907bf6c6ab1). This
    dataset is stored on AWS S3 cloud storage in a requester-pays bucket. You can
    work with this data for free in any environment (there are no egress fees).'
  driver: intake_xarray.xzarr.ZarrSource
  metadata:
    catalog_dir: https://raw.githubusercontent.com/hytest-org/hytest/main/dataset_catalog/subcatalogs

Parallelize with Dask#

Some of the steps we will take are aware of parallel clustered compute environments using dask. We’re going to start a cluster now so that future steps can take advantage of this ability.

This is an optional step, but speed ups data loading significantly, especially when accessing data from the cloud.

We have some documentation on how to start a Dask Cluster in different environments here.

%run ../environment_set_up/Start_Dask_Cluster_Nebari.ipynb
## If this notebook is not being run on Nebari/ESIP, replace the above 
## path name with a helper appropriate to your compute environment.  Examples:
# %run ../environment_set_up/Start_Dask_Cluster_Denali.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Tallgrass.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_Desktop.ipynb
# %run ../environment_set_up/Start_Dask_Cluster_PangeoCHS.ipynb
The 'cluster' object can be used to adjust cluster behavior.  i.e. 'cluster.adapt(minimum=10)'
The 'client' object can be used to directly interact with the cluster.  i.e. 'client.submit(func)' 
The link to view the client dashboard is:
>  https://nebari.esipfed.org/gateway/clusters/dev.c9da2d4b603f4a3da7652c4abea3e26e/status

Explore the dataset#

ds = cat[dataset].to_dask()
ds
<xarray.Dataset>
Dimensions:         (time: 368064, y: 1015, x: 1367, bottom_top_stag: 51,
                     bottom_top: 50, soil_layers_stag: 4, x_stag: 1368,
                     y_stag: 1016, snow_layers_stag: 3, snso_layers_stag: 7)
Coordinates:
    lat             (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lat_u           (y, x_stag) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lat_v           (y_stag, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon             (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon_u           (y, x_stag) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon_v           (y_stag, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
  * time            (time) datetime64[ns] 1979-10-01 ... 2021-09-25T23:00:00
  * x               (x) float64 -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y               (y) float64 -2.028e+06 -2.024e+06 ... 2.024e+06 2.028e+06
Dimensions without coordinates: bottom_top_stag, bottom_top, soil_layers_stag,
                                x_stag, y_stag, snow_layers_stag,
                                snso_layers_stag
Data variables: (12/157)
    ACDEWC          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACDRIPR         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACDRIPS         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACECAN          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACEDIR          (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ACETLSM         (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    ...              ...
    ZNU             (bottom_top) float32 dask.array<chunksize=(50,), meta=np.ndarray>
    ZNW             (bottom_top_stag) float32 dask.array<chunksize=(51,), meta=np.ndarray>
    ZS              (soil_layers_stag) float32 dask.array<chunksize=(4,), meta=np.ndarray>
    ZSNSO           (time, snso_layers_stag, y, x) float32 dask.array<chunksize=(144, 7, 175, 175), meta=np.ndarray>
    ZWT             (time, y, x) float32 dask.array<chunksize=(144, 175, 175), meta=np.ndarray>
    crs             int32 ...
Attributes: (12/148)
    AER_ANGEXP_OPT:                  1
    AER_ANGEXP_VAL:                  1.2999999523162842
    AER_AOD550_OPT:                  1
    AER_AOD550_VAL:                  0.11999999731779099
    AER_ASY_OPT:                     1
    AER_ASY_VAL:                     0.8999999761581421
    ...                              ...
    WEST-EAST_PATCH_START_STAG:      1
    WEST-EAST_PATCH_START_UNSTAG:    1
    W_DAMPING:                       1
    YSU_TOPDOWN_PBLMIX:              0
    history:                         Tue Mar 29 16:35:22 2022: ncrcat -A -vW ...
    history_of_appended_files:       Tue Mar 29 16:35:22 2022: Appended file ...
ds.SNOW
<xarray.DataArray 'SNOW' (time: 368064, y: 1015, x: 1367)>
dask.array<open_dataset-986a06d6b28d681a9e9ca356f92aad3eSNOW, shape=(368064, 1015, 1367), dtype=float32, chunksize=(144, 175, 175), chunktype=numpy.ndarray>
Coordinates:
    lat      (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
    lon      (y, x) float32 dask.array<chunksize=(175, 175), meta=np.ndarray>
  * time     (time) datetime64[ns] 1979-10-01 ... 2021-09-25T23:00:00
  * x        (x) float64 -2.732e+06 -2.728e+06 ... 2.728e+06 2.732e+06
  * y        (y) float64 -2.028e+06 -2.024e+06 -2.02e+06 ... 2.024e+06 2.028e+06
Attributes:
    description:   SNOW WATER EQUIVALENT
    grid_mapping:  crs
    long_name:     Snow water equivalent
    units:         kg m-2

Daily averages#

Time averages of any type are easy to do with xarray. Here we do 24 hour averages, and set the time offset to 12 hours, so that the time values are in the middle of the averaging period.

Digital Earth Africa has a great Working with Time in Xarray tutorial.

In the example below we just do a few days with a few variables as a quick demo.

%%time
ds_subset = ds[['T2','U10']].sel(time=slice('2017-01-02','2017-01-13'))
CPU times: user 12.7 ms, sys: 4.12 ms, total: 16.8 ms
Wall time: 16.4 ms
ds_subset_daily = ds_subset.resample(time="24H", loffset="12H").mean()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[10], line 1
----> 1 ds_subset_daily = ds_subset.resample(time="24H", loffset="12H").mean()

File /home/conda/users/222098f819851fd4ec9d4c9445d950452119627537a69aa8cd965d7dbb1ef7e2-20230602-144731-342097-199-pangeo/lib/python3.10/site-packages/xarray/core/dataset.py:9172, in Dataset.resample(self, indexer, skipna, closed, label, base, keep_attrs, loffset, restore_coord_dims, **indexer_kwargs)
   9123 """Returns a Resample object for performing resampling operations.
   9124 
   9125 Handles both downsampling and upsampling. The resampled
   (...)
   9168 .. [1] http://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases
   9169 """
   9170 from .resample import DatasetResample
-> 9172 return self._resample(
   9173     resample_cls=DatasetResample,
   9174     indexer=indexer,
   9175     skipna=skipna,
   9176     closed=closed,
   9177     label=label,
   9178     base=base,
   9179     keep_attrs=keep_attrs,
   9180     loffset=loffset,
   9181     restore_coord_dims=restore_coord_dims,
   9182     **indexer_kwargs,
   9183 )

File /home/conda/users/222098f819851fd4ec9d4c9445d950452119627537a69aa8cd965d7dbb1ef7e2-20230602-144731-342097-199-pangeo/lib/python3.10/site-packages/xarray/core/common.py:965, in DataWithCoords._resample(self, resample_cls, indexer, skipna, closed, label, base, keep_attrs, loffset, restore_coord_dims, **indexer_kwargs)
    963         grouper = CFTimeGrouper(freq, closed, label, base, loffset)
    964     else:
--> 965         grouper = pd.Grouper(
    966             freq=freq, closed=closed, label=label, base=base, loffset=loffset
    967         )
    968 group = DataArray(
    969     dim_coord, coords=dim_coord.coords, dims=dim_coord.dims, name=RESAMPLE_DIM
    970 )
    971 return resample_cls(
    972     self,
    973     group=group,
   (...)
    977     restore_coord_dims=restore_coord_dims,
    978 )

File /home/conda/users/222098f819851fd4ec9d4c9445d950452119627537a69aa8cd965d7dbb1ef7e2-20230602-144731-342097-199-pangeo/lib/python3.10/site-packages/pandas/core/resample.py:1663, in TimeGrouper.__init__(self, freq, closed, label, how, axis, fill_method, limit, kind, convention, origin, offset, group_keys, **kwargs)
   1660 # always sort time groupers
   1661 kwargs["sort"] = True
-> 1663 super().__init__(freq=freq, axis=axis, **kwargs)

TypeError: Grouper.__init__() got an unexpected keyword argument 'base'
ds_subset_daily
ds_subset_daily.hvplot.quadmesh(x='lon', y='lat', rasterize=True, 
                             geo=True, tiles='OSM', alpha=0.7, cmap='turbo')

Write daily values as a Zarr dataset (to onprem or cloud)#

You will need to to turn the following cell from raw to code and update the filepaths in order to save out your data.

Shutdown cluster#

client.close(); cluster.shutdown()